package org.lisy.distributed.lock.zookeeper;

import java.sql.Timestamp;
import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 分布式不可重入排它锁
 * 
 * @author lisy
 */
public class InterProcessSemaphoreMutexLock implements ConnectionStateListener {

	static boolean connectionOk = false;

	public static void main(String[] args) {
		InterProcessSemaphoreMutexLock main = new InterProcessSemaphoreMutexLock();
		CuratorFramework client = main.init();
		while (true) {
			try {
				if (!connectionOk) {
					System.out.println(new Timestamp(System.currentTimeMillis()) + " not connection zookeeper !");
					TimeUnit.SECONDS.sleep(5);
					continue;
				}
				InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, "/distributed/lock/leader");
				// acquire 设置时间变为非阻塞,等待5s
				if (!lock.acquire(5, TimeUnit.SECONDS)) {
					System.out.println(new Timestamp(System.currentTimeMillis()) + " not get lock, wait 2 second !");
					TimeUnit.SECONDS.sleep(2);
					continue;
				}
				System.out.println(new Timestamp(System.currentTimeMillis()) + " get lock");
				int count = 0;
				while (true) {
					if (count == 10) {
						if (lock.acquire(2, TimeUnit.SECONDS)) {
							System.out.println(new Timestamp(System.currentTimeMillis()) + " get lock again !");
							lock.release();
							System.out.println(new Timestamp(System.currentTimeMillis()) + " lock release !");
						} else {
							System.out.println(new Timestamp(System.currentTimeMillis()) + " not get lock again !");
						}
					} else if (count == 20) {
						lock.release();
						System.out.println(new Timestamp(System.currentTimeMillis()) + " lock release !");
						break;
					}
					System.out.println(new Timestamp(System.currentTimeMillis()) + " lock effective, wait 2 second !");
					count++;
					TimeUnit.SECONDS.sleep(2);
				}
				System.out.println(new Timestamp(System.currentTimeMillis()) + " compalte once work");
				TimeUnit.SECONDS.sleep(2);
			} catch (Exception e) {
				e.printStackTrace();
				try {
					TimeUnit.SECONDS.sleep(2);
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}
	}

	public CuratorFramework init() {
		// 客户端重试策略 初始休眠时间为 1000ms, 最大重试次数为 3
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
		// ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181), 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)
		String path = "127.0.0.1:2181";
		// Curator 客户端对象 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间
		CuratorFramework client = CuratorFrameworkFactory.newClient(path, 60000, 15000, retryPolicy);
		client.getConnectionStateListenable().addListener(this);
		client.start();
		return client;
	}

	@Override
	public void stateChanged(CuratorFramework client, ConnectionState newState) {
		System.out.println(new Timestamp(System.currentTimeMillis()) + " access stateChanged :" + newState);
		if (ConnectionState.CONNECTED.equals(newState) || ConnectionState.RECONNECTED.equals(newState)) {
			// 连接成功，修改状态
			connectionOk = true;
		} else {
			// 断开连接，修改状态
			connectionOk = false;
		}
	}
}
